package cn.shanguoyu.base.springboot.starter.idempotent.core.strategy.impl.spel;

import cn.hutool.core.text.StrPool;
import cn.hutool.core.util.StrUtil;
import cn.shanguoyu.base.springboot.starter.cache.abs.Cache;
import cn.shanguoyu.base.springboot.starter.idempotent.aop.Idempotent;
import cn.shanguoyu.base.springboot.starter.idempotent.core.IdempotentParamWrapper;
import cn.shanguoyu.base.springboot.starter.idempotent.core.strategy.IdempotentContext;
import cn.shanguoyu.base.springboot.starter.idempotent.core.strategy.IdempotentExecuteHandler;
import cn.shanguoyu.base.springboot.starter.idempotent.core.strategy.abs.AbstractIdempotentTemplate;
import cn.shanguoyu.base.springboot.starter.idempotent.enums.IdempotentMQConsumeStatusEnum;
import cn.shanguoyu.base.springboot.starter.idempotent.toolkit.SpELUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.reflect.MethodSignature;

import java.util.concurrent.TimeUnit;

/**
 * @description:基于SPEL方法验证请求幂等性，适用于MQ场景
 * @author：sgy
 * @date: 2023-06-04
 */
@Slf4j
@RequiredArgsConstructor
public class IdempotentSpELByMQExecuteHandler extends AbstractIdempotentTemplate implements IdempotentExecuteHandler {
    private Cache cache;
    private final static String WRAPPER = "wrapper:spEL:MQ";


    @Override
    protected void buildWrapper(IdempotentParamWrapper idempotentParamWrapper) {
        Idempotent idempotent = idempotentParamWrapper.getIdempotent();
        ProceedingJoinPoint joinPoint = idempotentParamWrapper.getJoinPoint();
        String lockKey = (String) SpELUtil.parseKey(idempotent.key(), ((MethodSignature) joinPoint.getSignature()).getMethod(), joinPoint.getArgs());
        if (StrUtil.isNotBlank(idempotentParamWrapper.getIdempotent().uniqueKeyPrefix())) {
            lockKey = StrUtil.join(idempotentParamWrapper.getIdempotent().uniqueKeyPrefix(), StrPool.COLON, lockKey);
        }
        idempotentParamWrapper.setLockKey(lockKey);
    }

    @Override
    protected void handler(IdempotentParamWrapper wrapper) {
        String key = wrapper.getLockKey();
        Idempotent idempotent = wrapper.getIdempotent();
        long timeout = idempotent.keyTimeout();
        Boolean putIfAbsent = cache.strPutIfAbsent(key, IdempotentMQConsumeStatusEnum.CONSUMING.getCode(), timeout, TimeUnit.SECONDS);
        if (putIfAbsent != null && !putIfAbsent) {
            String consumeStatus = cache.strGet(key);
            boolean error = IdempotentMQConsumeStatusEnum.isError(consumeStatus);
            log.warn("[{}] MQ repeated consumption, {}.", key, error ? "Wait for the client to delay consumption" : "");
            throw new RepeatConsumptionException(error);
        }
        IdempotentContext.put(WRAPPER, wrapper);
    }

    @Override
    public void exceptionProcessing() {
        IdempotentParamWrapper wrapper = (IdempotentParamWrapper) IdempotentContext.getKey(WRAPPER);
        if (wrapper != null) {
            String key = wrapper.getLockKey();
            cache.delete(key);
        }
    }

    @Override
    public void postProcessing() {
        IdempotentParamWrapper wrapper = (IdempotentParamWrapper) IdempotentContext.getKey(WRAPPER);
        if (wrapper != null) {
            Idempotent idempotent = wrapper.getIdempotent();
            String lockKey = wrapper.getLockKey();
            cache.strPut(lockKey, IdempotentMQConsumeStatusEnum.CONSUMED.getCode(), idempotent.keyTimeout(), TimeUnit.SECONDS);
        }
    }
}
